package com.witsoft.manager;


import com.witsoft.device.utils.DateUtil;
import com.witsoft.thingsboard.domain.DeviceStatus;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import java.util.Map;
import java.util.concurrent.*;


/**
 * @desc 队列管理
 */
@Slf4j
@Component
public class MessageThreadManager {


    @Resource
    private RestTemplate restTemplate;

    //线程池所使用的缓冲队列大小
    private final static int WORK_QUEUE_SIZE = 50;

    @Autowired
    @Qualifier("messageScheduledExecutorService")
    private ScheduledExecutorService scheduledExecutorService;

    @Autowired
    private ThreadPoolTaskExecutor threadPoolTaskExecutor;

    /**
     * 用于存储在队列中的消息，防止重复提交导致队列中存在重复数据 --后期优化为redis
     */
    Map<String, Object> cacheMap = new ConcurrentHashMap<>();


    @Value("${gongli.mes.host}")
    private String host;

    @Value("${gongli.mes.send-mes-api}")
    private String mesSendApi;

    @Value("${gongli.mes.sign}")
    private String sign;

    @Resource
    private QueueManager queueManager;



    @PostConstruct
    public void init(){
        //每2秒检查一下队列，将数据重新取出进行处理
        scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {

                //log.info("执行定时任务，判断缓冲队列是否存在数据...{}", DateUtil.sdf_s.format(new Date()));
                //判断缓冲队列是否存在记录
                if(!queueManager.msgQueueIsEmpty()){
                    //当线程池的队列容量小于WORK_QUEUE_SIZE, 则开始把缓冲池队列的事件，加入到线程池
                    BlockingQueue<Runnable> queue = threadPoolTaskExecutor.getThreadPoolExecutor().getQueue();
                    if(queue.size() < WORK_QUEUE_SIZE){
                        DeviceStatus poll = (DeviceStatus)queueManager.pool();

                        MessageThreadHandler messageThreadHandler = new MessageThreadHandler(poll, restTemplate, host, mesSendApi, sign);
                        threadPoolTaskExecutor.execute(messageThreadHandler);

                        log.info("将缓冲队列中的消息重新加到线程池中...");
                    }
                }
            }
        }, 10 , 2, TimeUnit.SECONDS);
    }


    /**
     * @desc 将任务加入消息线程池
     */
    public void addMsg(DeviceStatus event){
        //log.info("此设备事件准备加入到线程池中：{}", event.getDeviceName());

        MessageThreadHandler threadHandler = new MessageThreadHandler(event, restTemplate, host, mesSendApi, sign);
        threadPoolTaskExecutor.execute(threadHandler);
    }



    //线程池销毁
    @PreDestroy
    public void destroy(){

        //log.info("终止消息转发线程池和调度池：{}", scheduledFuture.cancel(false));
        log.info("终止消息转发线程池和调度池：{}");
        scheduledExecutorService.shutdown();
        threadPoolTaskExecutor.shutdown();
    }

}
